RTSP 服务器 源码理解

RTSP 服务器 源码理解

Fd 文件描述符

fd (File Descriptor) 打开的文件或网络连接的一个抽象标识符。每个文件描述符通常是一个整数,在程序运行时用于引用特定的文件或套接字。

管理网络连接:文件描述符用于表示客户端与服务器之间的网络连接。RTSP服务器会为每个连接分配一个文件描述符,以便能够区分和管理多个客户端连接。

非阻塞I/O:通过将文件描述符设置为非阻塞模式,服务器可以在无需等待数据准备好的情况下进行其他操作。这对于高效处理多个并发连接是非常重要的。

回调函数:通过为文件描述符绑定回调函数(如 readCallback),服务器可以在特定事件(如有数据可读)发生时自动调用这些函数,从而处理相应的I/O操作。

select 网络模型select 是一种多路复用技术,用于监视多个文件描述符,以查看哪些文件描述符处于可读、可写或有异常的状态。这使得服务器可以在一个线程中高效地处理多个网络连接。

如何理解 Live555 的Source & Sink

img

Source 发送端, 流的起点, 可直观理解为生产者, 负责读取文件或网络流的信息.
Sink 接收端, 流的终点, 可理解为是消费者。

Source: 可能是RTP读取数据, 从文件中或摄像头设备中等.

Sink: 数据流最终可保存在文件中, 或显示在屏幕上等.

MediaSession: 用于表示一个RTP会话, 一个MediaSession可能包含多个子会话(MediaSubSession),子会话可以是音频子会话、视频子会话等。

MediaSource - 流的源头

https://www.jianshu.com/p/0bdf07f7a5d5)

Source 和 sink 都有一个源头类 MediaSouce是所有Souce的基类

img

MediaSink是所有Sink的基类

img

image-20240722184309778image-20240722184210063image-20240722184229787

以H264进行举例 H264VideoStreamFramer是真正的Souce,它用于从H264文件中读取数据,并组装成帧。

H264VideoFileSink是真正的Sink, 完成将数据保存至文件.

H264VideoRTPSink是真正的Sink, 完成数据的发送

1
2
3
4
5
对于H264码流,数据流的流动方向为:
服务器端:
H264VideoStreamFramer ->H264Or5Fragmenter (Filter)r->H264VideoRTPSink
客户端:
H264RTPSouce -> Sink

setReuseAddr

setReuseAddr 函数在网络编程中用于设置套接字选项,使得端口可以在套接字关闭后立即重新使用。这对于服务器应用程序特别有用,因为它允许服务器在崩溃或重启后快速重新绑定到相同的端口,而不必等待系统默认的时间间隔(通常是几分钟)才能重新使用该端口。

具体来说,setReuseAddr 函数通过设置 SO_REUSEADDR 套接字选项来实现这一功能。

系统结构

RTSP结构图

  • 通过SessionManger 来存储实入流 每一个流都定义为一个session
  • 定时器发送控制信号到SessionManager以维持帧率。
  • SessionManager 管理会话 test 和其包含的流。
  • RTSP服务器接收并处理客户端连接请求。
  • RTSP连接处理具体的流,推送 h264 和 aac 流到客户端。
  • 客户端通过 RTSP 客户端软件接收并播放流媒体内容。

udp传输不需要加入fd

main function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int main() {
/*
*
程序初始化了一份session名为test的资源,访问路径如下

// rtp over tcp
ffplay -i -rtsp_transport tcp rtsp://127.0.0.1:8554/test

// rtp over udp
ffplay -i rtsp://127.0.0.1:8554/test

*/

srand(time(NULL));//时间初始化
// 事件调度
EventScheduler* scheduler = EventScheduler::createNew(EventScheduler::POLLER_SELECT);
// 读取资源的线程池
ThreadPool* threadPool = ThreadPool::createNew(1);//
// 资源管理器
MediaSessionManager* sessMgr = MediaSessionManager::createNew();
// 简化版live555 将scheduler实例和threadpool实例封装 方便传递
UsageEnvironment* env = UsageEnvironment::createNew(scheduler, threadPool);

Ipv4Address rtspAddr("127.0.0.1", 8554);
// 创建RTSP server
RtspServer* rtspServer = RtspServer::createNew(env, sessMgr,rtspAddr);

LOGI("----------session init start------");
{
MediaSession* session = MediaSession::createNew("test");
MediaSource* source = H264FileMediaSource::createNew(env, "../data/daliu.h264");
Sink* sink = H264FileSink::createNew(env, source);
session->addSink(MediaSession::TrackId0, sink);

source = AACFileMeidaSource::createNew(env, "../data/daliu.aac");
sink = AACFileSink::createNew(env, source);
session->addSink(MediaSession::TrackId1, sink);

//session->startMulticast(); //多播
sessMgr->addSession(session);
}
LOGI("----------session init end------");


rtspServer->start();

env->scheduler()->loop();
return 0;
}

创建RTSP Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
RtspServer::RtspServer(UsageEnvironment* env, MediaSessionManager* sessMgr, Ipv4Address& addr) :
mSessMgr(sessMgr),
mEnv(env),
mAddr(addr),
mListen(false)
{
// 创建RTSP Server描述符 为非阻塞
mFd = sockets::createTcpSock();
// 重用端口
sockets::setReuseAddr(mFd, 1);
// 绑定
if (!sockets::bind(mFd, addr.getIp(), mAddr.getPort())) {
return;
}

LOGI("rtsp://%s:%d fd=%d",addr.getIp().data(),addr.getPort(), mFd);
// RTSP server的描述符作为参数传给IO事件
mAcceptIOEvent = IOEvent::createNew(mFd, this);
mAcceptIOEvent->setReadCallback(readCallback);//设置回调的socket可读 函数指针
mAcceptIOEvent->enableReadHandling();

mCloseTriggerEvent = TriggerEvent::createNew(this);
mCloseTriggerEvent->setTriggerCallback(cbCloseConnect);//设置回调的关闭连接 函数指针

}
1
2
3
4
5
6
7
8
9
10
11
12
void RtspServer::handleRead() {
int clientFd = sockets::accept(mFd);
if (clientFd < 0)
{
LOGE("handleRead error,clientFd=%d",clientFd);
return;
}
RtspConnection* conn = RtspConnection::createNew(this, clientFd);
conn->setDisConnectCallback(RtspServer::cbDisConnect, this);
mConnMap.insert(std::make_pair(clientFd, conn));

}

回调函数调用

1
2
3
4
5
6
7
8
9
10
11
12
void RtspServer::handleRead() {
int clientFd = sockets::accept(mFd);
if (clientFd < 0)
{
LOGE("handleRead error,clientFd=%d",clientFd);
return;
}
RtspConnection* conn = RtspConnection::createNew(this, clientFd);
conn->setDisConnectCallback(RtspServer::cbDisConnect, this);
mConnMap.insert(std::make_pair(clientFd, conn));

}

在创建RTSP Sever实例后,只要RTSP Server的描述符出现了可读事件(readCallback)>> 执行 HandelRead(),然后创建ClientFd,监收请求的客户端描述符。当ClientFd不小于0,创建一个连接,当结束时 触发回调函数,结束连接。

1
2
3
4
5
6
7
8
9
void RtspServer::handleDisConnect(int clientFd) {

std::lock_guard <std::mutex> lck(mMtx);
// 将clientFd插入到取消连接的队列里
mDisConnList.push_back(clientFd);
// 加入触发事件 关闭连接
mEnv->scheduler()->addTriggerEvent(mCloseTriggerEvent);

}

在RTSP Server里,包含triggerEvent实例 当事件加入到调度中,被处理到的时候激活函数

handleDisconnect中只是加入了队列,并没有人去执行,Trigger Event 被激活之后 回调到cbCloseConnect,然后被handleCloseConnect处理

handleCloseConnect遍历队列 清理连接

1
2
3
4
5
6
7
8
9
10
11
MediaSession* session = MediaSession::createNew("test");
MediaSource* source = H264FileMediaSource::createNew(env, "../data/daliu.h264");
Sink* sink = H264FileSink::createNew(env, source);
session->addSink(MediaSession::TrackId0, sink);

source = AACFileMeidaSource::createNew(env, "../data/daliu.aac");
sink = AACFileSink::createNew(env, source);
session->addSink(MediaSession::TrackId1, sink);

//session->startMulticast(); //多播
sessMgr->addSession(session);

通过source 传给sink资源

两路流 trackId0和trackId1都加入session里

sessMgr -> addSession(session)

RtspServer start

1
2
3
4
5
6
void RtspServer::start(){
LOGI("");
mListen = true; // 设置mListen
sockets::listen(mFd, 60);
mEnv->scheduler()->addIOEvent(mAcceptIOEvent); // 创建IO事件 加入队列循环
}

Scheduler loop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#ifdef WIN32
std::thread([](EventScheduler* sch) {
// 定时器
while (!sch->mQuit) {
if (sch->mTimerManagerReadCallback) {
sch->mTimerManagerReadCallback(sch->mTimerManagerArg);
}
}
}, this).detach();
#endif // WIN32

while (!mQuit) {
// 当没有退出的时候 执行触发事件
handleTriggerEvents();
mPoller->handleEvent();
}
}

void EventScheduler::handleTriggerEvents()
{
if (!mTriggerEvents.empty())
{
for (std::vector<TriggerEvent*>::iterator it = mTriggerEvents.begin();
it != mTriggerEvents.end(); ++it)
{
(*it)->handleEvent();
}

mTriggerEvents.clear();
}
}

Poller 网络模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
switch (type) {

case POLLER_SELECT:
mPoller = SelectPoller::createNew();
break;

//case POLLER_POLL:
// mPoller = PollPoller::createNew();
// break;

//case POLLER_EPOLL:
// mPoller = EPollPoller::createNew();
// break;

default:
_exit(-1);
break;
}
mTimerManager = TimerManager::createNew(this);//WIN系统的定时器回调由子线程托管,非WIN系统则通过select网络模型

这里需要一个定时器管理器 面对多路流传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 定时器事件
mTimerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);

if (mTimerFd < 0) {
LOGE("create TimerFd error");
return;
}else{
LOGI("fd=%d",mTimerFd);
}
// 创建IO事件 设置readCallback的回调,并进行激活,并加入poller
mTimerIOEvent = IOEvent::createNew(mTimerFd, this);
mTimerIOEvent->setReadCallback(readCallback);
mTimerIOEvent->enableReadHandling();
modifyTimeout();
mPoller->addIOEvent(mTimerIOEvent);
#else
// Win系统设置回调函数,和他保持一致
scheduler->setTimerManagerReadCallback(readCallback, this);

setTimerManagerReadCallback

1
2
3
4
5
6
void EventScheduler::setTimerManagerReadCallback(EventCallback cb, void* arg)
{
// 设置给时间管理器的可读回调和实例对象
mTimerManagerReadCallback = cb;
mTimerManagerArg = arg;
}

如果设置fps为25

定时器每一秒回调25次 40ms/t

定时器事件不为空,则去除事件计算差值,在sink里创建事件事件

1
2
3
LOGI("Sink()");
mTimerEvent = TimerEvent::createNew(this);
mTimerEvent->setTimeoutCallback(cbTimeout);

handelTimeout

1
2
3
4
5
6
7
8
9
10
11
12
13
void Sink::cbTimeout(void *arg) {
Sink* sink = (Sink*)arg;
sink->handleTimeout();
}
void Sink::handleTimeout() {
MediaFrame* frame = mMediaSource->getFrameFromOutputQueue();
if (!frame) {
return;
}
this->sendFrame(frame);// 由具体子类实现发送逻辑

mMediaSource->putFrameToInputQueue(frame);//将使用过的frame插入输入队列,插入输入队列以后,加入一个子线程task,从文件中读取数据再次将输入写入到frame
}

Task

加入线程池 每个子线程都读取一个队列 没有则阻塞等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 将task加入队列
void ThreadPool::addTask(ThreadPool::Task& task)
{
std::unique_lock <std::mutex> lck(mMtx);
mTaskQueue.push(task);
mCon.notify_one();
}

void ThreadPool::loop(){

while(!mQuit){

std::unique_lock <std::mutex> lck(mMtx);
if (mTaskQueue.empty()) {
mCon.wait(lck);
}


if(mTaskQueue.empty())
continue;

Task task = mTaskQueue.front();
mTaskQueue.pop();
// 如果有值handle回调
task.handle();
}

}

handle最终调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void H264FileMediaSource::handleTask()
{
std::lock_guard <std::mutex> lck(mMtx);

// 从输入队列读取frame
if (mFrameInputQueue.empty())
return;

// 输入队列读取
MediaFrame* frame = mFrameInputQueue.front();
int startCodeNum = 0;

while (true)
{
frame->mSize = getFrameFromH264File(frame->temp, FRAME_MAX_SIZE);
if (frame->mSize < 0) {
return;
}
if (startCode3(frame->temp)){
startCodeNum = 3;
}else{
startCodeNum = 4;
}
frame->mBuf = frame->temp + startCodeNum;
frame->mSize -= startCodeNum;

uint8_t naluType = frame->mBuf[0] & 0x1F;
//LOGI("startCodeNum=%d,naluType=%d,naluSize=%d", startCodeNum, naluType, frame->mSize);

if (0x09 == naluType) {
// discard the type byte
continue;
}
else if (0x07 == naluType || 0x08 == naluType) {
//continue;
break;
}
else {
break;
}
}

// 弹出输入队列 加入输出队列
mFrameInputQueue.pop();
mFrameOutputQueue.push(frame);
}